<?php

namespace xunwu659\WebmanQueue;

use Exception;
use RedisException;
use xunwu659\WebmanQueue\Exceptions\ScheduleDelayedMessageException;
use xunwu659\WebmanQueue\Interface\ConsumerMessageInterface;
use xunwu659\WebmanQueue\Interface\QueueMessageInterface;
use xunwu659\WebmanQueue\Interface\QueueProducerInterface;
use xunwu659\WebmanQueue\Log\LogUtility;
use xunwu659\WebmanQueue\Queue\Factory\QueueMessageFactory;
use xunwu659\WebmanQueue\Queue\Factory\QueueProducerFactory;
use xunwu659\WebmanQueue\Queue\QueueUtility;
use xunwu659\WebmanQueue\Redis\Redis;
use xunwu659\WebmanQueue\Redis\RedisConnection;
use Throwable;

abstract class Consumer
{
    /**
     * Pending message handling mode 0
     * Directly acknowledge and delete the message after pending timeout
     */
    const PENDING_PROCESSING_IGNORE = 0;

    /**
     * Default behavior: attempt to retry after pending timeout
     */
    const PENDING_PROCESSING_RETRY = 1;

    /**
     * Connection identifier
     * @var string
     */
    protected string $connection = 'default';

    /**
     * Redis connection
     * @var Redis|RedisConnection
     */
    protected Redis|RedisConnection $redisConnection;

    /**
     * Queue name, auto-generated by default
     * @var string
     */
    protected string $queueName = '';

    /**
     * Queue group name, auto-generated by default
     * @var string
     */
    protected string $groupName = '';

    /**
     * Stream key, auto-generated by default
     * @var string
     */
    protected string $streamKey = '';

    /**
     * Maximum number of messages to return. Default is 1.
     * Increasing consumer processes is recommended over increasing this value.
     * @var int
     */
    protected int $prefetchCount = 1;

    /**
     * Block time in milliseconds when waiting for new messages. Default is 5000ms.
     * For delay queues, this should be close to the delay interval.
     * @var int
     */
    protected int $blockTime = 5000;

    /**
     * Consumer interval time in seconds. Default is 0.5 seconds after processing a message.
     * @var float
     */
    protected float $consumerTimerInterval = 0.5;

    /**
     * Maximum retry attempts after consumption failure
     * @var int
     */
    protected int $maxAttempts = 5;

    /**
     * Retry interval in seconds
     * @var int
     */
    protected int $retrySeconds = 60;

    /**
     * Automatic message acknowledgment. Explicitly call $consumerMessage->ack() in business logic.
     * @var bool
     */
    protected bool $autoAck = true;

    /**
     * Delete immediately after ack is successful
     * @var bool
     */
    protected bool $autoDel = true;

    /**
     * Number of delayed queue messages processed at once.
     * @var int
     */
    protected int $delayedQueueOnceHandlerCount = 128;

    /**
     * Interval for processing delayed messages, default is 1 second.
     * @var int
     */
    protected int $delayedMessagesTimerInterval = 1;

    /**
     * Maximum number of workers for delayed messages. Default is single-threaded.
     * 0 disables delayed queue processing, -1 enables on all processes.
     * @var int
     */
    protected int $delayedMessagesMaxWorkerCount = 1;

    /**
     * Delayed queue SET KEY, auto-generated by default.
     * @var string
     */
    protected string $delayedTaskSetKey = '';

    /**
     * Delayed queue HASH KEY, auto-generated by default.
     * @var string
     */
    protected string $delayedDataHashKey = '';

    /**
     * Pending message processing strategy after exceeding $pendingTimout.
     * self::PENDING_PROCESSING_RETRY retries the message after timeout.
     * self::PENDING_PROCESSING_IGNORE only cleans the list and invokes dead letter handling.
     * @var int
     */
    protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_RETRY;

    /**
     * Timeout in seconds for pending messages.
     * @var int
     */
    protected int $pendingTimout = 300;

    /**
     * Interval for checking the pending list.
     * @var int
     */
    protected int $checkPendingTimerInterval = 60;

    /**
     * Number of pending messages checked at once.
     * @var int
     */
    protected int $onceCheckPendingCount = 50;


    /**
     * @throws Throwable
     * @throws RedisException
     */
    public function __construct()
    {
        if (empty($this->connection)) { // Use default
            $this->setConnection('default');
        }

        $className = get_class($this);

        // Initialize redis connection
        $this->redisConnection = Redis::getInstance($this->getConnection());

        // Auto-generated by default
        $this->queueName = $this->queueName ?: QueueUtility::generateQueueName($className);
        $this->groupName = $this->groupName ?: QueueUtility::generateGroupName($className);
        $this->streamKey = $this->streamKey ?: QueueUtility::generateStreamKey($className);
        $this->delayedTaskSetKey = $this->delayedTaskSetKey ?: QueueUtility::generateDelayedTaskSetKey($className);
        $this->delayedDataHashKey = $this->delayedDataHashKey ?: QueueUtility::generateDelayedDataHashKey($className);
    }

    /**
     * @return string
     */
    public function getConnection(): string
    {
        return $this->connection;
    }

    /**
     * @param string $connection
     */
    public function setConnection(string $connection): void
    {
        $this->connection = $connection;
    }

    /**
     * @return Redis|RedisConnection
     */
    public function getRedisConnection(): Redis|RedisConnection
    {
        return $this->redisConnection;
    }

    /**
     * @param Redis|RedisConnection $redisConnection
     */
    public function setRedisConnection(Redis|RedisConnection $redisConnection): void
    {
        $this->redisConnection = $redisConnection;
    }

    /**
     * @return string
     */
    public function getQueueName(): string
    {
        return $this->queueName;
    }

    /**
     * @param string $queueName
     */
    public function setQueueName(string $queueName): void
    {
        $this->queueName = $queueName;
    }

    /**
     * @return string
     */
    public function getGroupName(): string
    {
        return $this->groupName;
    }

    /**
     * @param string $groupName
     */
    public function setGroupName(string $groupName): void
    {
        $this->groupName = $groupName;
    }

    /**
     * @return string
     */
    public function getStreamKey(): string
    {
        return $this->streamKey;
    }

    /**
     * @param string $streamKey
     */
    public function setStreamKey(string $streamKey): void
    {
        $this->streamKey = $streamKey;
    }

    /**
     * @return int
     */
    public function getPrefetchCount(): int
    {
        return $this->prefetchCount;
    }

    /**
     * @param int $prefetchCount
     */
    public function setPrefetchCount(int $prefetchCount): void
    {
        $this->prefetchCount = $prefetchCount;
    }

    /**
     * @return int
     */
    public function getBlockTime(): int
    {
        return $this->blockTime;
    }

    /**
     * @param int $blockTime
     */
    public function setBlockTime(int $blockTime): void
    {
        $this->blockTime = $blockTime;
    }

    /**
     * @return float
     */
    public function getConsumerTimerInterval(): float
    {
        return $this->consumerTimerInterval;
    }

    /**
     * @param float $consumerTimerInterval
     */
    public function setConsumerTimerInterval(float $consumerTimerInterval): void
    {
        $this->consumerTimerInterval = $consumerTimerInterval;
    }

    /**
     * @return int
     */
    public function getMaxAttempts(): int
    {
        return $this->maxAttempts;
    }

    /**
     * @param int $maxAttempts
     */
    public function setMaxAttempts(int $maxAttempts): void
    {
        $this->maxAttempts = $maxAttempts;
    }

    /**
     * @return int
     */
    public function getRetrySeconds(): int
    {
        return $this->retrySeconds;
    }

    /**
     * @param int $retrySeconds
     */
    public function setRetrySeconds(int $retrySeconds): void
    {
        $this->retrySeconds = $retrySeconds;
    }

    /**
     * @return bool
     */
    public function isAutoAck(): bool
    {
        return $this->autoAck;
    }

    /**
     * @param bool $autoAck
     */
    public function setAutoAck(bool $autoAck): void
    {
        $this->autoAck = $autoAck;
    }

    /**
     * @return bool
     */
    public function isAutoDel(): bool
    {
        return $this->autoDel;
    }

    /**
     * @param bool $autoDel
     */
    public function setAutoDel(bool $autoDel): void
    {
        $this->autoDel = $autoDel;
    }

    /**
     * @return int
     */
    public function getDelayedQueueOnceHandlerCount(): int
    {
        return $this->delayedQueueOnceHandlerCount;
    }

    /**
     * @param int $delayedQueueOnceHandlerCount
     */
    public function setDelayedQueueOnceHandlerCount(int $delayedQueueOnceHandlerCount): void
    {
        $this->delayedQueueOnceHandlerCount = $delayedQueueOnceHandlerCount;
    }

    /**
     * @return int
     */
    public function getDelayedMessagesMaxWorkerCount(): int
    {
        return $this->delayedMessagesMaxWorkerCount;
    }

    /**
     * @param int $delayedMessagesMaxWorkerCount
     */
    public function setDelayedMessagesMaxWorkerCount(int $delayedMessagesMaxWorkerCount): void
    {
        $this->delayedMessagesMaxWorkerCount = $delayedMessagesMaxWorkerCount;
    }

    /**
     * @return int
     */
    public function getDelayedMessagesTimerInterval(): int
    {
        return $this->delayedMessagesTimerInterval;
    }

    /**
     * @param int $delayedMessagesTimerInterval
     */
    public function setDelayedMessagesTimerInterval(int $delayedMessagesTimerInterval): void
    {
        $this->delayedMessagesTimerInterval = $delayedMessagesTimerInterval;
    }

    /**
     * @return string
     */
    public function getDelayedDataHashKey(): string
    {
        return $this->delayedDataHashKey;
    }

    /**
     * @param string $delayedDataHashKey
     */
    public function setDelayedDataHashKey(string $delayedDataHashKey): void
    {
        $this->delayedDataHashKey = $delayedDataHashKey;
    }

    /**
     * @return string
     */
    public function getDelayedTaskSetKey(): string
    {
        return $this->delayedTaskSetKey;
    }

    /**
     * @param string $delayedTaskSetKey
     */
    public function setDelayedTaskSetKey(string $delayedTaskSetKey): void
    {
        $this->delayedTaskSetKey = $delayedTaskSetKey;
    }

    /**
     * @return int
     */
    public function getPendingProcessingStrategy(): int
    {
        return $this->pendingProcessingStrategy;
    }

    /**
     * @param int $pendingProcessingStrategy
     */
    public function setPendingProcessingStrategy(int $pendingProcessingStrategy): void
    {
        $this->pendingProcessingStrategy = $pendingProcessingStrategy;
    }

    /**
     * @return int
     */
    public function getCheckPendingTimerInterval(): int
    {
        return $this->checkPendingTimerInterval;
    }

    /**
     * @param int $checkPendingTimerInterval
     */
    public function setCheckPendingTimerInterval(int $checkPendingTimerInterval): void
    {
        $this->checkPendingTimerInterval = $checkPendingTimerInterval;
    }

    /**
     * @return int
     */
    public function getOnceCheckPendingCount(): int
    {
        return $this->onceCheckPendingCount;
    }

    /**
     * @param int $onceCheckPendingCount
     */
    public function setOnceCheckPendingCount(int $onceCheckPendingCount): void
    {
        $this->onceCheckPendingCount = $onceCheckPendingCount;
    }

    /**
     * @return int
     */
    public function getPendingTimout(): int
    {
        return $this->pendingTimout;
    }

    /**
     * @param int $pendingTimout
     */
    public function setPendingTimout(int $pendingTimout): void
    {
        $this->pendingTimout = $pendingTimout;
    }

    /**
     * @return QueueProducerInterface
     * @throws RedisException
     * @throws Throwable
     */
    public static function createQueueProducer(): QueueProducerInterface
    {
        return QueueProducerFactory::create(new static());
    }

    /**
     * @param $data
     * @return QueueMessageInterface
     */
    public static function createQueueMessage($data): QueueMessageInterface
    {
        return QueueMessageFactory::create(new static(), $data);
    }

    /**
     * @param ConsumerMessageInterface $consumerMessage
     */
    public abstract function consume(ConsumerMessageInterface $consumerMessage);


    /**
     * 处理错误重试
     * @param string $messageId
     * @param ConsumerMessageInterface $consumerMessage
     * @param Throwable $e
     * @return bool
     * @throws RedisException
     * @throws ScheduleDelayedMessageException
     * @throws Throwable
     */
    public function handlerFailRetry(string $messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): bool
    {
        $queueMessage = $consumerMessage->getQueueMessage();

        // 检查是否超过最大重试次数
        if ($queueMessage->getFailCount() >= $this->maxAttempts) {
            // 死信处理
            $this->handlerDeadLetterQueue($messageId, $consumerMessage, $e);
            return true;
        }

        $queueMessage->incrementFailCount(); // Fail count + 1

        // 计算下次重试时间
        $retrySeconds = $queueMessage->getFailCount() * $this->retrySeconds;

        // 更新下次重试时间
        $queueMessage->updateNextRetry($retrySeconds);

        // 设置消息延时
        $queueMessage->setDelay($retrySeconds);

        // 设置消息ID 避免重复任务
        $queueMessage->setIdentifier($messageId);

        // 重新发布至延时队列
        return self::createQueueProducer()->scheduleDelayedMessage($queueMessage);
    }

    /**
     * 处理消息挂起超时 当pending列表中有超时未ack的消息会触发此方法
     * @param string $messageId
     * @param ConsumerMessageInterface $consumerMessage
     * @param string $consumerName
     * @param int $elapsedTime
     * @param int $deliveryCount
     * @return void
     * @throws RedisException
     * @throws ScheduleDelayedMessageException
     * @throws Throwable
     */
    public function handlerPendingTimeoutMessages(string $messageId, ConsumerMessageInterface $consumerMessage, string $consumerName, int $elapsedTime, int $deliveryCount): void
    {
        switch ($this->getPendingProcessingStrategy()) {
            case self::PENDING_PROCESSING_IGNORE: // 忽略pending超时

                // 确认消息
                $consumerMessage->ack();

                // 触发死信处理
                $this->handlerDeadLetterQueue($messageId, $consumerMessage, new Exception(
                    'PENDING_PROCESSING_IGNORE: Message pending timeout.'
                ));
                break;
            case self::PENDING_PROCESSING_RETRY: // pending超时重试

                // 触发死信处理
                if ($deliveryCount + 1 > $this->getMaxAttempts()) {

                    // ack消息
                    $consumerMessage->ack();

                    $this->handlerDeadLetterQueue(
                        $messageId,
                        $consumerMessage,
                        new Exception(
                            'PENDING_PROCESSING_RETRY: The number of message delivery times exceeds the maximum number of retries.'
                        ));

                    return;
                }

                // 处理重试
                $handlerStatus = $this->handlerFailRetry(
                    $messageId,
                    $consumerMessage,
                    new Exception('PENDING_PROCESSING_RETRY: Message pending timeout retry.')
                );

                if ($handlerStatus) {
                    $consumerMessage->ack();
                }
                break;
        }
    }


    /**
     * 处理死信队列
     * @param string $messageId
     * @param ConsumerMessageInterface $consumerMessage
     * @param Throwable $e
     * @return void
     */
    public function handlerDeadLetterQueue(string $messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): void
    {
        $queueMessage = $consumerMessage->getQueueMessage();

        // 添加日志
        LogUtility::warning('dead_letter_queue: ', [
            'messageId' => $messageId,
            'message' => $queueMessage->toArray(),
            'failCount' => $queueMessage->getFailCount(),
            'errorMsg' => $e->getMessage(),
            'trace' => $e->getTraceAsString()
        ]);
    }
}
